package day16;

import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author: 王丹
 * @create: 2021-07-23 17:54
 * TaskDisposeUtils是一个并行处理的工具类，可以传入n个任务内部使用线程池进行处理，等待所有任务都处理完成之后，
 * 方法才会返回。比如我们发送短信，系统中有1万条短信，我们使用上面的工具，每次取100条并行发送，
 * 待100个都处理完毕之后，再取一批按照同样的逻辑发送。
 **/
public class TaskDisposeUtils {
    public static final int POOL_SIZE;
    static {
        POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);
    }

    public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        dispose(true, POOL_SIZE, taskList, consumer);
    }

    public static <T> void dispose(boolean moreThread, int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        if (!CollectionUtils.isEmpty(taskList)) {
            if (moreThread && poolSize > 1) {
                poolSize = Math.min(poolSize, taskList.size());
                ExecutorService executorService = null;

                try {
                    executorService = Executors.newFixedThreadPool(poolSize);
                    CountDownLatch countDownLatch = new CountDownLatch(taskList.size());

                    for (T item : taskList) {
                        executorService.execute(() -> {
                            try {
                                consumer.accept(item);
                            } finally {
                                countDownLatch.countDown();
                            }

                        });
                    }

                    countDownLatch.await();
                } finally {
                    if (executorService != null) {
                        executorService.shutdown();
                    }

                }
            } else {
                for (T item : taskList) {
                    consumer.accept(item);
                }
            }
        }

    }


    public static void main(String[] args) throws InterruptedException {
        List<Integer> list = (List) Stream.iterate(1, (a) -> {
            return a + 1;
        }).limit(10L).collect(Collectors.toList());
        dispose(list, (item) -> {
            try {
                long startTime = System.currentTimeMillis();
                TimeUnit.SECONDS.sleep((long) item);
                long endTime = System.currentTimeMillis();
                System.out.println(System.currentTimeMillis() + ",任务" + item + "执行完毕，耗时:" + (endTime - startTime));
            } catch (InterruptedException var5) {
                var5.printStackTrace();
            }

        });
        System.out.println(list + "中的任务都处理完毕!");
    }
}
